---
title: "Workflow Pattern Composition"
description: "Advanced patterns for composing and orchestrating complex agent workflows"
---

<Info>
  Learn how to combine multiple workflow patterns, create nested workflows, and implement advanced coordination patterns for sophisticated agent systems.
</Info>

## Pattern Composition Overview

Workflow pattern composition allows you to build complex agent systems by combining simpler, well-tested patterns. This approach provides:

<CardGroup cols={2}>
  <Card title="Modularity" icon="puzzle-piece">
    Build complex workflows from reusable components
  </Card>
  <Card title="Testability" icon="flask">
    Test individual patterns in isolation
  </Card>
  <Card title="Maintainability" icon="wrench">
    Update and evolve patterns independently
  </Card>
  <Card title="Scalability" icon="chart-line">
    Scale different patterns based on workload
  </Card>
</CardGroup>

## Combining Multiple Patterns

### Sequential Pattern Composition

Chain different workflow patterns together:

```python
from mcp_agent.app import MCPApp
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from datetime import datetime

app = MCPApp(name="composed_agent")

@app.workflow
class DataPipelineWorkflow(Workflow[dict]):
    """Combines extraction, validation, processing, and reporting patterns."""
    
    @app.workflow_run
    async def run(self, source_config: dict) -> WorkflowResult[dict]:
        pipeline_results = {}
        
        # Step 1: Data Extraction Pattern
        extraction_result = await self.extract_data(source_config)
        pipeline_results["extraction"] = extraction_result
        
        # Step 2: Data Validation Pattern
        validation_result = await self.validate_data(extraction_result)
        pipeline_results["validation"] = validation_result
        
        # Step 3: Parallel Processing Pattern
        processing_result = await self.process_data_parallel(validation_result)
        pipeline_results["processing"] = processing_result
        
        # Step 4: Aggregation and Reporting Pattern
        report = await self.generate_report(processing_result)
        pipeline_results["report"] = report
        
        return WorkflowResult(value=pipeline_results)
    
    async def extract_data(self, config: dict) -> dict:
        """Data extraction workflow pattern."""
        extractor_agent = Agent(
            name="data_extractor",
            instruction="Extract data from various sources with high reliability.",
            server_names=["database", "api", "filesystem"]
        )
        
        async with extractor_agent:
            llm = await extractor_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Extract from multiple sources
            sources = config.get("sources", [])
            extracted_data = []
            
            for source in sources:
                extraction = await llm.generate_str(
                    f"Extract data from {source['type']}: {source['location']}"
                )
                extracted_data.append({
                    "source": source,
                    "data": extraction,
                    "timestamp": datetime.utcnow().isoformat()
                })
            
            return {
                "extracted_items": extracted_data,
                "total_sources": len(sources)
            }
    
    async def validate_data(self, extracted_data: dict) -> dict:
        """Data validation workflow pattern."""
        validator_agent = Agent(
            name="data_validator", 
            instruction="Validate data quality and consistency.",
            server_names=["validation_service"]
        )
        
        async with validator_agent:
            llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
            
            validated_items = []
            validation_errors = []
            
            for item in extracted_data["extracted_items"]:
                validation = await llm.generate_str(
                    f"Validate data quality and schema: {item['data']}"
                )
                
                if "valid" in validation.lower():
                    validated_items.append(item)
                else:
                    validation_errors.append({
                        "item": item,
                        "error": validation
                    })
            
            return {
                "valid_items": validated_items,
                "errors": validation_errors,
                "validation_rate": len(validated_items) / extracted_data["total_sources"]
            }
    
    async def process_data_parallel(self, validated_data: dict) -> dict:
        """Parallel processing workflow pattern."""
        import asyncio
        
        async def process_item(item):
            processor_agent = Agent(
                name=f"processor_{item['source']['type']}", 
                instruction="Process and enrich data items.",
                server_names=["ml_service", "enrichment_api"]
            )
            
            async with processor_agent:
                llm = await processor_agent.attach_llm(OpenAIAugmentedLLM)
                processed = await llm.generate_str(
                    f"Process and enrich: {item['data']}"
                )
                
                return {
                    "original": item,
                    "processed": processed,
                    "processing_timestamp": datetime.utcnow().isoformat()
                }
        
        # Process all valid items in parallel
        processing_tasks = [
            process_item(item) 
            for item in validated_data["valid_items"]
        ]
        
        processed_results = await asyncio.gather(*processing_tasks)
        
        return {
            "processed_items": processed_results,
            "processing_count": len(processed_results)
        }
    
    async def generate_report(self, processed_data: dict) -> dict:
        """Report generation workflow pattern."""
        reporter_agent = Agent(
            name="report_generator",
            instruction="Generate comprehensive reports from processed data.",
            server_names=["reporting_service", "filesystem"]
        )
        
        async with reporter_agent:
            llm = await reporter_agent.attach_llm(OpenAIAugmentedLLM)
            
            summary = await llm.generate_str(
                f"Generate executive summary for {len(processed_data['processed_items'])} processed items"
            )
            
            detailed_report = await llm.generate_str(
                f"Create detailed analysis report: {processed_data}"
            )
            
            return {
                "summary": summary,
                "detailed_report": detailed_report,
                "report_timestamp": datetime.utcnow().isoformat(),
                "items_processed": processed_data["processing_count"]
            }
```

### Parallel Pattern Composition

Run multiple patterns concurrently:

```python
@app.workflow
class MultiAnalysisWorkflow(Workflow[dict]):
    """Run multiple analysis patterns in parallel."""
    
    @app.workflow_run
    async def run(self, document: str) -> WorkflowResult[dict]:
        # Launch multiple analysis patterns concurrently
        analysis_tasks = await asyncio.gather(
            self.sentiment_analysis_pattern(document),
            self.entity_extraction_pattern(document),
            self.topic_modeling_pattern(document),
            self.quality_assessment_pattern(document),
            self.summarization_pattern(document)
        )
        
        # Combine results from all patterns
        combined_results = {
            "sentiment": analysis_tasks[0],
            "entities": analysis_tasks[1],
            "topics": analysis_tasks[2],
            "quality": analysis_tasks[3],
            "summary": analysis_tasks[4],
            "analysis_timestamp": datetime.utcnow().isoformat()
        }
        
        # Generate meta-analysis
        meta_analysis = await self.meta_analysis_pattern(combined_results)
        combined_results["meta_analysis"] = meta_analysis
        
        return WorkflowResult(value=combined_results)
    
    async def sentiment_analysis_pattern(self, text: str) -> dict:
        """Sentiment analysis workflow pattern."""
        sentiment_agent = Agent(
            name="sentiment_analyzer",
            instruction="Analyze text sentiment with nuanced understanding.",
            server_names=["sentiment_api", "ml_service"]
        )
        
        async with sentiment_agent:
            llm = await sentiment_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Primary sentiment analysis
            primary_sentiment = await llm.generate_str(
                f"Analyze overall sentiment of this text: {text[:500]}..."
            )
            
            # Aspect-based sentiment
            aspects_sentiment = await llm.generate_str(
                f"Analyze sentiment for key aspects/topics in: {text[:500]}..."
            )
            
            # Confidence scoring
            confidence = await llm.generate_str(
                f"Rate confidence in sentiment analysis (0-100): {primary_sentiment}"
            )
            
            return {
                "primary_sentiment": primary_sentiment,
                "aspects": aspects_sentiment,
                "confidence": confidence,
                "pattern": "sentiment_analysis"
            }
    
    async def entity_extraction_pattern(self, text: str) -> dict:
        """Named entity recognition workflow pattern."""
        entity_agent = Agent(
            name="entity_extractor",
            instruction="Extract and classify entities with high precision.",
            server_names=["ner_service", "knowledge_graph"]
        )
        
        async with entity_agent:
            llm = await entity_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Extract entities
            entities = await llm.generate_str(
                f"Extract named entities (people, places, organizations, etc.): {text[:500]}..."
            )
            
            # Entity relationships
            relationships = await llm.generate_str(
                f"Identify relationships between entities: {entities}"
            )
            
            # Entity disambiguation
            disambiguated = await llm.generate_str(
                f"Disambiguate entities using context: {entities}"
            )
            
            return {
                "entities": entities,
                "relationships": relationships,
                "disambiguated": disambiguated,
                "pattern": "entity_extraction"
            }
    
    async def meta_analysis_pattern(self, all_analyses: dict) -> dict:
        """Meta-analysis pattern to synthesize insights."""
        meta_agent = Agent(
            name="meta_analyzer",
            instruction="Synthesize insights from multiple analysis patterns.",
            server_names=["synthesis_engine"]
        )
        
        async with meta_agent:
            llm = await meta_agent.attach_llm(OpenAIAugmentedLLM)
            
            synthesis = await llm.generate_str(
                f"Synthesize key insights from multiple analyses: {all_analyses}"
            )
            
            confidence_assessment = await llm.generate_str(
                f"Assess overall confidence in combined analysis results"
            )
            
            recommendations = await llm.generate_str(
                f"Generate actionable recommendations based on synthesis: {synthesis}"
            )
            
            return {
                "synthesis": synthesis,
                "confidence": confidence_assessment,
                "recommendations": recommendations,
                "pattern": "meta_analysis"
            }
```

## Nested Workflow Patterns

### Hierarchical Workflow Composition

Create workflows that spawn child workflows:

```python
@app.workflow
class ProjectManagementWorkflow(Workflow[dict]):
    """Master workflow that orchestrates project execution."""
    
    @app.workflow_run
    async def run(self, project_config: dict) -> WorkflowResult[dict]:
        project_results = {}
        
        # Phase 1: Project Planning (Child Workflow)
        planning_handle = await self.start_child_workflow(
            PlanningWorkflow,
            project_config,
            workflow_id=f"planning-{project_config['project_id']}"
        )
        project_results["planning"] = await planning_handle.result()
        
        # Phase 2: Resource Allocation (Child Workflow)
        resources_handle = await self.start_child_workflow(
            ResourceAllocationWorkflow,
            {
                "project_plan": project_results["planning"],
                "budget": project_config["budget"]
            },
            workflow_id=f"resources-{project_config['project_id']}"
        )
        project_results["resources"] = await resources_handle.result()
        
        # Phase 3: Parallel Task Execution (Multiple Child Workflows)
        task_handles = []
        tasks = project_results["planning"]["tasks"]
        
        for task in tasks:
            task_handle = await self.start_child_workflow(
                TaskExecutionWorkflow,
                {
                    "task": task,
                    "resources": project_results["resources"],
                    "project_context": project_config
                },
                workflow_id=f"task-{project_config['project_id']}-{task['id']}"
            )
            task_handles.append(task_handle)
        
        # Wait for all tasks to complete
        task_results = []
        for handle in task_handles:
            result = await handle.result()
            task_results.append(result)
        
        project_results["tasks"] = task_results
        
        # Phase 4: Project Closure (Child Workflow)
        closure_handle = await self.start_child_workflow(
            ProjectClosureWorkflow,
            {
                "project_results": project_results,
                "original_config": project_config
            },
            workflow_id=f"closure-{project_config['project_id']}"
        )
        project_results["closure"] = await closure_handle.result()
        
        return WorkflowResult(value=project_results)

@app.workflow
class PlanningWorkflow(Workflow[dict]):
    """Child workflow for project planning."""
    
    @app.workflow_run
    async def run(self, project_config: dict) -> WorkflowResult[dict]:
        planner_agent = Agent(
            name="project_planner",
            instruction="Create detailed project plans with task breakdown.",
            server_names=["project_mgmt", "resource_db"]
        )
        
        async with planner_agent:
            llm = await planner_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Analyze project requirements
            requirements = await llm.generate_str(
                f"Analyze project requirements: {project_config}"
            )
            
            # Create task breakdown structure
            task_breakdown = await llm.generate_str(
                f"Create detailed task breakdown: {requirements}"
            )
            
            # Estimate timeline and dependencies
            timeline = await llm.generate_str(
                f"Create project timeline with dependencies: {task_breakdown}"
            )
            
            # Risk assessment
            risks = await llm.generate_str(
                f"Identify project risks and mitigation strategies: {project_config}"
            )
            
            return WorkflowResult(value={
                "requirements": requirements,
                "tasks": task_breakdown,
                "timeline": timeline,
                "risks": risks,
                "planning_completed": datetime.utcnow().isoformat()
            })

@app.workflow
class TaskExecutionWorkflow(Workflow[dict]):
    """Child workflow for individual task execution."""
    
    @app.workflow_run
    async def run(self, task_data: dict) -> WorkflowResult[dict]:
        task = task_data["task"]
        
        # Task-specific agent
        executor_agent = Agent(
            name=f"task_executor_{task['type']}",
            instruction=f"Execute {task['type']} tasks efficiently and thoroughly.",
            server_names=task.get("required_services", ["general"])
        )
        
        async with executor_agent:
            llm = await executor_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Execute task with progress tracking
            execution_result = await llm.generate_str(
                f"Execute task: {task} with resources: {task_data['resources']}"
            )
            
            # Quality check
            quality_check = await llm.generate_str(
                f"Perform quality check on task execution: {execution_result}"
            )
            
            # Generate deliverable
            deliverable = await llm.generate_str(
                f"Create task deliverable: {execution_result}"
            )
            
            return WorkflowResult(value={
                "task_id": task["id"],
                "execution_result": execution_result,
                "quality_check": quality_check,
                "deliverable": deliverable,
                "completion_time": datetime.utcnow().isoformat()
            })
```

## Dynamic Workflow Composition

### Runtime Pattern Selection

Choose workflow patterns based on runtime conditions:

```python
@app.workflow  
class AdaptiveAnalysisWorkflow(Workflow[dict]):
    """Dynamically selects analysis patterns based on input characteristics."""
    
    @app.workflow_run
    async def run(self, content: dict) -> WorkflowResult[dict]:
        # Analyze input to determine optimal patterns
        content_analysis = await self.analyze_content_characteristics(content)
        
        # Select appropriate patterns based on characteristics
        selected_patterns = await self.select_patterns(content_analysis)
        
        # Execute selected patterns dynamically
        pattern_results = {}
        for pattern_name in selected_patterns:
            result = await self.execute_pattern(pattern_name, content)
            pattern_results[pattern_name] = result
        
        # Synthesize results
        final_result = await self.synthesize_results(pattern_results, content_analysis)
        
        return WorkflowResult(value=final_result)
    
    async def analyze_content_characteristics(self, content: dict) -> dict:
        """Analyze input to determine its characteristics."""
        analyzer_agent = Agent(
            name="content_analyzer",
            instruction="Analyze content characteristics to guide processing strategy.",
            server_names=["analysis_service"]
        )
        
        async with analyzer_agent:
            llm = await analyzer_agent.attach_llm(OpenAIAugmentedLLM)
            
            characteristics = await llm.generate_str(f"""
            Analyze these content characteristics:
            1. Content type and format
            2. Length and complexity
            3. Language and domain
            4. Required processing depth
            5. Time sensitivity
            
            Content: {content}
            """)
            
            return {"characteristics": characteristics, "content_type": content.get("type")}
    
    async def select_patterns(self, content_analysis: dict) -> list[str]:
        """Select optimal patterns based on content analysis."""
        selector_agent = Agent(
            name="pattern_selector",
            instruction="Select optimal processing patterns based on content analysis.",
            server_names=["decision_engine"]
        )
        
        async with selector_agent:
            llm = await selector_agent.attach_llm(OpenAIAugmentedLLM)
            
            pattern_selection = await llm.generate_str(f"""
            Based on this content analysis, select the most appropriate processing patterns:
            
            Available patterns:
            - detailed_analysis: Deep, comprehensive analysis (slow, thorough)
            - rapid_analysis: Quick insights extraction (fast, basic)
            - multilingual_analysis: Language-specific processing
            - technical_analysis: Domain-specific technical processing
            - sentiment_analysis: Emotion and opinion analysis
            - factual_analysis: Fact-checking and verification
            - comparative_analysis: Comparison with reference materials
            
            Analysis: {content_analysis}
            
            Return comma-separated list of selected patterns.
            """)
            
            # Parse selected patterns
            selected = [p.strip() for p in pattern_selection.split(",")]
            return selected
    
    async def execute_pattern(self, pattern_name: str, content: dict) -> dict:
        """Execute a specific analysis pattern."""
        pattern_executors = {
            "detailed_analysis": self.detailed_analysis_pattern,
            "rapid_analysis": self.rapid_analysis_pattern,
            "multilingual_analysis": self.multilingual_analysis_pattern,
            "technical_analysis": self.technical_analysis_pattern,
            "sentiment_analysis": self.sentiment_analysis_pattern,
            "factual_analysis": self.factual_analysis_pattern,
            "comparative_analysis": self.comparative_analysis_pattern
        }
        
        executor = pattern_executors.get(pattern_name)
        if executor:
            return await executor(content)
        else:
            return {"error": f"Unknown pattern: {pattern_name}"}
    
    async def detailed_analysis_pattern(self, content: dict) -> dict:
        """Comprehensive analysis pattern."""
        detailed_agent = Agent(
            name="detailed_analyzer",
            instruction="Perform thorough, comprehensive analysis with deep insights.",
            server_names=["deep_analysis", "knowledge_base", "ml_service"]
        )
        
        async with detailed_agent:
            llm = await detailed_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Multi-stage deep analysis
            structural_analysis = await llm.generate_str(f"Deep structural analysis: {content}")
            contextual_analysis = await llm.generate_str(f"Contextual analysis: {structural_analysis}")
            implications = await llm.generate_str(f"Derive implications: {contextual_analysis}")
            
            return {
                "pattern": "detailed_analysis",
                "structural": structural_analysis,
                "contextual": contextual_analysis,
                "implications": implications,
                "depth": "comprehensive"
            }
    
    async def rapid_analysis_pattern(self, content: dict) -> dict:
        """Quick analysis pattern for time-sensitive processing."""
        rapid_agent = Agent(
            name="rapid_analyzer",
            instruction="Provide quick, essential insights with time efficiency.",
            server_names=["fast_analysis"]
        )
        
        async with rapid_agent:
            llm = await rapid_agent.attach_llm(OpenAIAugmentedLLM)
            
            quick_insights = await llm.generate_str(f"Quick key insights: {content}")
            
            return {
                "pattern": "rapid_analysis", 
                "insights": quick_insights,
                "depth": "surface"
            }
```

## State Sharing Between Workflows

### Shared State Management

Implement state sharing across workflow patterns:

```python
from typing import Dict, Any
import json

@app.workflow
class StatefulOrchestrator(Workflow[dict]):
    """Orchestrator that maintains shared state across patterns."""
    
    def __init__(self):
        self.shared_state: Dict[str, Any] = {
            "global_context": {},
            "pattern_results": {},
            "workflow_metadata": {},
            "communication_log": []
        }
    
    @app.workflow_run
    async def run(self, initial_data: dict) -> WorkflowResult[dict]:
        # Initialize shared state
        self.shared_state["global_context"] = initial_data
        self.shared_state["workflow_metadata"] = {
            "start_time": datetime.utcnow().isoformat(),
            "workflow_id": workflow.info().workflow_id,
            "run_id": workflow.info().run_id
        }
        
        # Execute patterns with shared state
        await self.execute_data_collection_pattern()
        await self.execute_processing_patterns()
        await self.execute_synthesis_pattern()
        
        return WorkflowResult(value={
            "final_state": self.shared_state,
            "execution_summary": await self.generate_execution_summary()
        })
    
    async def execute_data_collection_pattern(self):
        """Data collection pattern that updates shared state."""
        collector_agent = Agent(
            name="data_collector",
            instruction="Collect data and update shared context.",
            server_names=["data_sources"]
        )
        
        async with collector_agent:
            llm = await collector_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Collect data based on current context
            collected_data = await llm.generate_str(
                f"Collect relevant data based on context: {self.shared_state['global_context']}"
            )
            
            # Update shared state
            self.shared_state["pattern_results"]["data_collection"] = {
                "collected_data": collected_data,
                "timestamp": datetime.utcnow().isoformat(),
                "status": "completed"
            }
            
            # Update global context with new data
            self.shared_state["global_context"]["collected_data"] = collected_data
            
            # Log communication
            self.shared_state["communication_log"].append({
                "pattern": "data_collection",
                "action": "state_update",
                "timestamp": datetime.utcnow().isoformat(),
                "data_keys": list(self.shared_state["pattern_results"]["data_collection"].keys())
            })
    
    async def execute_processing_patterns(self):
        """Execute multiple processing patterns that share state."""
        # Pattern 1: Analysis
        await self.execute_analysis_pattern()
        
        # Pattern 2: Validation (uses analysis results)
        await self.execute_validation_pattern()
        
        # Pattern 3: Enhancement (uses both previous results)
        await self.execute_enhancement_pattern()
    
    async def execute_analysis_pattern(self):
        """Analysis pattern that reads and updates shared state."""
        analysis_agent = Agent(
            name="analyzer",
            instruction="Analyze data using shared context and state.",
            server_names=["analysis_service"]
        )
        
        async with analysis_agent:
            llm = await analysis_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Use shared state for analysis
            current_context = self.shared_state["global_context"]
            previous_results = self.shared_state.get("pattern_results", {})
            
            analysis_result = await llm.generate_str(f"""
            Perform analysis using shared context:
            Context: {current_context}
            Previous Results: {previous_results}
            """)
            
            # Update shared state with analysis
            self.shared_state["pattern_results"]["analysis"] = {
                "result": analysis_result,
                "timestamp": datetime.utcnow().isoformat(),
                "input_context": current_context
            }
            
            # Update global context
            self.shared_state["global_context"]["analysis_insights"] = analysis_result
    
    async def execute_validation_pattern(self):
        """Validation pattern that uses analysis results from shared state."""
        validator_agent = Agent(
            name="validator",
            instruction="Validate analysis results using shared state.",
            server_names=["validation_service"]
        )
        
        async with validator_agent:
            llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Access analysis results from shared state
            analysis_result = self.shared_state["pattern_results"]["analysis"]["result"]
            
            validation_result = await llm.generate_str(f"""
            Validate analysis result:
            Analysis to validate: {analysis_result}
            Global context: {self.shared_state['global_context']}
            """)
            
            # Update shared state
            self.shared_state["pattern_results"]["validation"] = {
                "validation_result": validation_result,
                "validated_analysis": analysis_result,
                "timestamp": datetime.utcnow().isoformat()
            }
            
            # Update global context based on validation
            is_valid = "valid" in validation_result.lower()
            self.shared_state["global_context"]["validation_status"] = is_valid
    
    async def execute_enhancement_pattern(self):
        """Enhancement pattern that uses all previous results."""
        enhancer_agent = Agent(
            name="enhancer",
            instruction="Enhance results using all available shared state.",
            server_names=["enhancement_service"]
        )
        
        async with enhancer_agent:
            llm = await enhancer_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Use all shared state for enhancement
            all_results = self.shared_state["pattern_results"]
            global_context = self.shared_state["global_context"]
            
            enhancement_result = await llm.generate_str(f"""
            Enhance results using all available information:
            All Pattern Results: {all_results}
            Global Context: {global_context}
            """)
            
            # Final state update
            self.shared_state["pattern_results"]["enhancement"] = {
                "enhanced_result": enhancement_result,
                "used_results": list(all_results.keys()),
                "timestamp": datetime.utcnow().isoformat()
            }
    
    async def execute_synthesis_pattern(self):
        """Final synthesis pattern that creates comprehensive output."""
        synthesizer_agent = Agent(
            name="synthesizer",
            instruction="Synthesize all shared state into final comprehensive result.",
            server_names=["synthesis_engine"]
        )
        
        async with synthesizer_agent:
            llm = await synthesizer_agent.attach_llm(OpenAIAugmentedLLM)
            
            synthesis = await llm.generate_str(f"""
            Synthesize comprehensive final result from all shared state:
            Complete State: {self.shared_state}
            """)
            
            self.shared_state["pattern_results"]["synthesis"] = {
                "final_synthesis": synthesis,
                "synthesized_patterns": list(self.shared_state["pattern_results"].keys()),
                "timestamp": datetime.utcnow().isoformat()
            }
    
    async def generate_execution_summary(self) -> dict:
        """Generate summary of workflow execution."""
        return {
            "executed_patterns": list(self.shared_state["pattern_results"].keys()),
            "execution_duration": "calculated_duration",
            "state_updates": len(self.shared_state["communication_log"]),
            "final_context_keys": list(self.shared_state["global_context"].keys())
        }
```

## Advanced Coordination Patterns

### Event-Driven Coordination

Implement event-driven coordination between patterns:

```python
from dataclasses import dataclass
from typing import List
from enum import Enum

class EventType(Enum):
    PATTERN_STARTED = "pattern_started"
    PATTERN_COMPLETED = "pattern_completed"
    DATA_UPDATED = "data_updated"
    ERROR_OCCURRED = "error_occurred"
    THRESHOLD_REACHED = "threshold_reached"

@dataclass
class WorkflowEvent:
    event_type: EventType
    source_pattern: str
    data: dict
    timestamp: str

@app.workflow
class EventDrivenCoordinator(Workflow[dict]):
    """Event-driven coordination between workflow patterns."""
    
    def __init__(self):
        self.event_queue: List[WorkflowEvent] = []
        self.pattern_states: Dict[str, str] = {}
        self.event_handlers: Dict[EventType, callable] = {
            EventType.PATTERN_COMPLETED: self.handle_pattern_completion,
            EventType.DATA_UPDATED: self.handle_data_update,
            EventType.ERROR_OCCURRED: self.handle_error,
            EventType.THRESHOLD_REACHED: self.handle_threshold
        }
    
    @app.workflow_run
    async def run(self, config: dict) -> WorkflowResult[dict]:
        # Initialize event-driven execution
        await self.initialize_patterns(config)
        
        # Event processing loop
        while not self.all_patterns_complete():
            # Process queued events
            await self.process_events()
            
            # Check for new triggers
            await self.check_triggers()
            
            # Wait a bit before next iteration
            await asyncio.sleep(1)
        
        return WorkflowResult(value={
            "execution_results": self.pattern_states,
            "processed_events": len(self.event_queue),
            "completion_time": datetime.utcnow().isoformat()
        })
    
    async def initialize_patterns(self, config: dict):
        """Initialize patterns based on configuration."""
        patterns_to_start = config.get("initial_patterns", ["data_ingestion"])
        
        for pattern_name in patterns_to_start:
            await self.start_pattern(pattern_name, config)
    
    async def start_pattern(self, pattern_name: str, config: dict):
        """Start a pattern and emit start event."""
        self.pattern_states[pattern_name] = "running"
        
        # Emit pattern started event
        event = WorkflowEvent(
            event_type=EventType.PATTERN_STARTED,
            source_pattern=pattern_name,
            data={"config": config},
            timestamp=datetime.utcnow().isoformat()
        )
        self.event_queue.append(event)
        
        # Execute pattern asynchronously
        asyncio.create_task(self.execute_pattern_async(pattern_name, config))
    
    async def execute_pattern_async(self, pattern_name: str, config: dict):
        """Execute pattern and emit completion event."""
        try:
            # Pattern execution logic
            pattern_agent = Agent(
                name=f"{pattern_name}_executor",
                instruction=f"Execute {pattern_name} pattern according to configuration.",
                server_names=config.get("required_services", ["general"])
            )
            
            async with pattern_agent:
                llm = await pattern_agent.attach_llm(OpenAIAugmentedLLM)
                result = await llm.generate_str(f"Execute {pattern_name}: {config}")
            
            # Update pattern state
            self.pattern_states[pattern_name] = "completed"
            
            # Emit completion event
            completion_event = WorkflowEvent(
                event_type=EventType.PATTERN_COMPLETED,
                source_pattern=pattern_name,
                data={"result": result, "status": "success"},
                timestamp=datetime.utcnow().isoformat()
            )
            self.event_queue.append(completion_event)
            
        except Exception as e:
            # Update state and emit error event
            self.pattern_states[pattern_name] = "failed"
            
            error_event = WorkflowEvent(
                event_type=EventType.ERROR_OCCURRED,
                source_pattern=pattern_name,
                data={"error": str(e), "status": "failed"},
                timestamp=datetime.utcnow().isoformat()
            )
            self.event_queue.append(error_event)
    
    async def process_events(self):
        """Process all queued events."""
        events_to_process = self.event_queue.copy()
        self.event_queue.clear()
        
        for event in events_to_process:
            handler = self.event_handlers.get(event.event_type)
            if handler:
                await handler(event)
    
    async def handle_pattern_completion(self, event: WorkflowEvent):
        """Handle pattern completion event."""
        completed_pattern = event.source_pattern
        
        # Determine next patterns to start based on completion
        next_patterns = self.get_next_patterns(completed_pattern)
        
        for next_pattern in next_patterns:
            if self.pattern_states.get(next_pattern) != "running":
                await self.start_pattern(next_pattern, event.data)
    
    async def handle_data_update(self, event: WorkflowEvent):
        """Handle data update event."""
        # Check if update triggers new patterns or threshold events
        data_size = len(str(event.data))
        
        if data_size > 10000:  # Large data threshold
            threshold_event = WorkflowEvent(
                event_type=EventType.THRESHOLD_REACHED,
                source_pattern=event.source_pattern,
                data={"threshold": "large_data", "size": data_size},
                timestamp=datetime.utcnow().isoformat()
            )
            self.event_queue.append(threshold_event)
    
    async def handle_error(self, event: WorkflowEvent):
        """Handle error event."""
        failed_pattern = event.source_pattern
        
        # Implement error recovery logic
        recovery_patterns = self.get_recovery_patterns(failed_pattern)
        
        for recovery_pattern in recovery_patterns:
            await self.start_pattern(recovery_pattern, {
                "recovery_mode": True,
                "failed_pattern": failed_pattern,
                "error_details": event.data
            })
    
    async def handle_threshold(self, event: WorkflowEvent):
        """Handle threshold reached event."""
        threshold_type = event.data.get("threshold")
        
        if threshold_type == "large_data":
            # Start parallel processing pattern for large data
            await self.start_pattern("parallel_processing", event.data)
    
    def get_next_patterns(self, completed_pattern: str) -> List[str]:
        """Get patterns that should start after completion."""
        pattern_dependencies = {
            "data_ingestion": ["data_validation", "initial_analysis"],
            "data_validation": ["data_processing"],
            "initial_analysis": ["detailed_analysis"],
            "data_processing": ["result_synthesis"],
            "detailed_analysis": ["result_synthesis"],
            "parallel_processing": ["result_aggregation"],
            "result_synthesis": ["final_reporting"],
            "result_aggregation": ["final_reporting"]
        }
        
        return pattern_dependencies.get(completed_pattern, [])
    
    def get_recovery_patterns(self, failed_pattern: str) -> List[str]:
        """Get recovery patterns for failed patterns."""
        recovery_map = {
            "data_ingestion": ["data_ingestion_retry"],
            "data_processing": ["alternative_processing"],
            "detailed_analysis": ["fallback_analysis"]
        }
        
        return recovery_map.get(failed_pattern, [])
    
    def all_patterns_complete(self) -> bool:
        """Check if all patterns are complete."""
        active_states = ["running", "pending"]
        return not any(state in active_states for state in self.pattern_states.values())
    
    async def check_triggers(self):
        """Check for external triggers that might start new patterns."""
        # This could check external systems, databases, APIs, etc.
        # For now, it's a placeholder for trigger logic
        pass
```

## Best Practices for Pattern Composition

<AccordionGroup>
  <Accordion title="Design for Composability">
    - Keep patterns focused on single responsibilities
    - Use well-defined interfaces between patterns
    - Make patterns stateless when possible
    - Document pattern dependencies clearly
    
    ```python
    # Good: Single responsibility pattern
    @app.workflow
    class DataValidationPattern(Workflow[dict]):
        """Focuses solely on data validation."""
        pass
    
    # Avoid: Pattern that tries to do everything
    @app.workflow  
    class DataEverythingPattern(Workflow[dict]):
        """Validates, processes, analyzes, and reports data."""
        pass
    ```
  </Accordion>
  
  <Accordion title="Handle Pattern Failures">
    - Implement graceful degradation
    - Use circuit breaker patterns
    - Provide fallback mechanisms
    - Log failures for debugging
    
    ```python
    async def execute_with_fallback(self, primary_pattern, fallback_pattern, data):
        try:
            return await primary_pattern(data)
        except Exception as e:
            logger.warning(f"Primary pattern failed: {e}, using fallback")
            return await fallback_pattern(data)
    ```
  </Accordion>
  
  <Accordion title="Optimize Resource Usage">
    - Share resources between patterns when possible
    - Use connection pooling for external services
    - Implement proper cleanup in patterns
    - Monitor resource consumption
    
    ```python
    @app.workflow
    class ResourceEfficientPattern(Workflow[dict]):
        def __init__(self):
            self.shared_agent_pool = AgentPool(max_size=5)
        
        async def cleanup(self):
            await self.shared_agent_pool.close()
    ```
  </Accordion>
  
  <Accordion title="Test Pattern Compositions">
    - Test patterns in isolation
    - Test pattern interactions
    - Use mocks for external dependencies
    - Validate error handling paths
    
    ```python
    @pytest.mark.asyncio
    async def test_pattern_composition():
        mock_config = {"test": True}
        
        workflow = ComposedWorkflow()
        result = await workflow.run(mock_config)
        
        assert result.value["pattern_1_complete"] == True
        assert result.value["pattern_2_complete"] == True
    ```
  </Accordion>
</AccordionGroup>

## Next Steps

<CardGroup cols={2}>
  <Card title="Monitoring & Observability" icon="chart-line" href="/advanced/monitoring">
    Set up comprehensive monitoring for composed workflows
  </Card>
  <Card title="Temporal Integration" icon="clock" href="/advanced/temporal">
    Deploy pattern compositions with Temporal for production durability
  </Card>
  <Card title="Workflow Examples" icon="code" href="/workflows/overview">
    Explore complete workflow pattern examples
  </Card>
  <Card title="Production Deployment" icon="rocket" href="/cloud/deployment-quickstart">
    Deploy composed workflows to production
  </Card>
</CardGroup>
